{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Project 6 - Solution"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The first thing I'm going to do is to simply copy the `parse_data` code from the pull pipeline example - we can reuse that code:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "import csv\n",
    "\n",
    "def parse_data(f_name):\n",
    "    f = open(f_name)\n",
    "    try:\n",
    "        dialect = csv.Sniffer().sniff(f.read(2000))\n",
    "        f.seek(0)\n",
    "        next(f)  # skip header row\n",
    "        yield from csv.reader(f, dialect=dialect)\n",
    "    finally:\n",
    "        f.close()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Since we're going to be creating coroutines, we're going to find a coroutine decorator to auto prime coroutines useful:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def coroutine(fn):\n",
    "    def inner(*args, **kwargs):\n",
    "        coro = fn(*args, **kwargs)\n",
    "        next(coro)\n",
    "        return coro\n",
    "    return inner"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's start writing the various coroutines and functions we are going to need for our pipeline:\n",
    "\n",
    "* coroutine to save data to a file\n",
    "* coroutine to filter data based on the vehicle name - but we'll make it generic and use a filter function (predicate) as an argument\n",
    "* coroutine to act as the pipeline\n",
    "* a context manager that will open and close the pipeline automatically"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "@coroutine\n",
    "def save_csv(f_name):\n",
    "    with open(f_name, 'w', newline='') as f:\n",
    "        writer = csv.writer(f)\n",
    "        while True:\n",
    "            row = yield\n",
    "            writer.writerow(row)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 84,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "@coroutine\n",
    "def filter_data(filter_pred, target):\n",
    "    while True:\n",
    "        row = yield\n",
    "        if filter_pred(row):\n",
    "            target.send(row)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now we need to create our custom pipeline that will create the various coroutines with appropriate filters, as well as the `save_csv` coroutine, and orchestrates the data flow:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 89,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "@coroutine\n",
    "def pipeline_coro(out_file, name_filters):\n",
    "    save = save_csv(out_file)\n",
    "    \n",
    "    target = save\n",
    "    for name_filter in name_filters:\n",
    "        target = filter_data(lambda d, v=name_filter: v in d[0], target)\n",
    "        # warning: we have to use the trick above because\n",
    "        # lambdas are actually closures and the free variable name_filter\n",
    "        # is a shared free variable - we have seen this problem before!\n",
    "    while True:\n",
    "        received = yield\n",
    "        target.send(received)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Next, we are going to create a context manager to automatically close the pipeline when we are done with it:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 86,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from contextlib import contextmanager\n",
    "\n",
    "@contextmanager\n",
    "def pipeline(out_file, name_filters):\n",
    "    p = pipeline_coro(out_file, name_filters)\n",
    "    try:\n",
    "        yield p\n",
    "    finally:\n",
    "        p.close()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "And now we can start using the pipeline:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 87,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "with pipeline('out.csv', ('Chevrolet', 'Landau', 'Carlo')) as p:\n",
    "    for row in parse_data('cars.csv'):\n",
    "        p.send(row)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "And finally let's make sure the data was written out correctly:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 88,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Chevrolet Monte Carlo Landau,15.5,8,350.0,170.0,4165.,11.4,77,US\n",
      "Chevrolet Monte Carlo Landau,19.2,8,305.0,145.0,3425.,13.2,78,US\n"
     ]
    }
   ],
   "source": [
    "with open('out.csv') as f:\n",
    "    for row in f:\n",
    "        print(row, end='')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Perfect!"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
